
/*
 * Copyright © 2021 https://www.cestc.cn/ All rights reserved.
 */

package com.zx.learn.flink.sink;

import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 1.ds.print 直接输出到控制台
 * 2.ds.printToErr() 直接输出到控制台,用红色
 * 3.ds.collect 将分布式数据收集为本地集合
 * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)
 */
public class SinkWriteAsText {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source
        //DataStream<String> ds = env.fromElements("hadoop", "flink");
        DataStream<String> ds = env.readTextFile("data/input/words.txt");

        //3.transformation
        //4.sink
        //4.1 控制台输出
        ds.print();
        ds.printToErr();
        //4.2 输出到文件
        ds.writeAsText("/data/output/words.txt", FileSystem.WriteMode.OVERWRITE)
                .setParallelism(1);
        //
        /*ds.addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {

            }
        })*/
        //注意:
        //Parallelism=1为文件
        //Parallelism>1为文件夹

        //5.execute
        env.execute();
    }
}
